<html lang="en">
<head>
<title>Akka Distributed Workers - Activator Template</title>
</head>
<body>
	<div>
		<h2>The Goal</h2>

		<p>
		Some applications need to distribute work to many machines because one single box
		obviously has limited resources. This tutorial will explain how to implement
		distributed workers with Akka cluster features.
		</p>

		<p><img src="tutorial/img1.png" border="0" /></p>

		<p>
		The solution should support:
		</p>
		<ul>
		<li>elastic addition/removal of frontend nodes that receives work from clients</li>
		<li>elastic addition/removal of worker actors and worker nodes</li>
        <li>thousands of workers</li>
		<li>jobs should not be lost, and if a worker fails, the job should be retried</li>
		</ul>

		<p>
		The design is based on Derek Wyatt's blog post
		<a href="http://letitcrash.com/post/29044669086/balancing-workload-across-nodes-with-akka-2"
		target="_blank">Balancing Workload Across Nodes with Akka 2</a>. This article describes
		the advantages of letting the workers pull work from the master instead of pushing work to
		the workers.
        </p>

	</div>
	<div>
		<h2>Explore the Code - Master</h2>

		<p>
		The heart of the solution is the Master actor that manages outstanding work
		and notifies registered workers when new work is available.
		</p>

		<p>
		The Master actor is a singleton within the nodes with role "backend" in the cluster.
		This means that there will be one active master actor in the cluster. It runs on the
		oldest node.
		</p>

		<p>
		You can see how the master singleton is started in the method <code>startBackend</code>
		in <a href="#code/src/main/scala/worker/Main.scala" class="shortcut">Main.scala</a>
		</p>

		<p><img src="tutorial/img2.png" border="0" /></p>

		<p>
		In case of failure of the master node another master actor is automatically started on
		a standby node. The master on the standby node takes over the responsibility for
		outstanding work. Work in progress can continue and will be reported to the new master.
		The state of the master can be re-created on the standby node using event sourcing.
		An alternative to event sourcing and the singleton master would be to keep track of all
		jobs in a central database, but that is more complicated and not as scalable. In the end
		of the tutorial we will describe how multiple masters can be supported with a small adjustment.
		</p>

		<p>
		The master actor is made available for workers by registering itself
		in the <a href="http://doc.akka.io/docs/akka/2.4.0/scala/cluster-client.html"
		target="_blank">ClusterReceptionist</a>.
		</p>
		
		<p>
		The frontend actor talks to the master actor via the
		in the <a href="http://doc.akka.io/docs/akka/2.4.0/scala/distributed-pub-sub.html"
		target="_blank">ClusterSingletonProxy</a>.
		</p>

		<p>
		Later we will explore the implementation of the <a href="#code/src/main/scala/worker/Master.scala" class="shortcut">Master</a>
		actor in depth, but first we will take a look at the frontend and worker that interacts with the master.
		</p>

	</div>
	<div>
		<h2>Explore the Code - Front End</h2>

		<p>
		A typical frontend provides a RESTful API that is used by the clients to submit (POST) jobs.
		When the service has accepted the job it returns Created/201 response code to the client.
		If it can't accept the job it returns a failure response code and the client has to retry or
		discard the job.
		</p>

		<p>
		In this example the frontend is emulated, for simplicity, by an ordinary actor, see
		<a href="#code/src/main/scala/worker/Frontend.scala" class="shortcut">Frontend.scala</a>
		and client requests are simulated by the
		<a href="#code/src/main/scala/worker/WorkProducer.scala" class="shortcut">WorkProducer.scala</a>.
		As you can see the <code>Frontend</code> actor sends the work to the active master via the
		<code>ClusterSingletonProxy</code>. It doesn't care about the exact location of the
		master. Somewhere in the cluster there should be one master actor running.
		The message is sent with <code>ask/?</code> to be able to reply to the client (<code>WorkProducer</code>)
		when the job has been accepted or denied by the master.
		</p>

		<p><img src="tutorial/img5.png" border="0" /></p>

		<p>
		You can see how a Frontend and WorkProducer actor is started in the method <code>startFrontend</code>
		in <a href="#code/src/main/scala/worker/Main.scala" class="shortcut">Main.scala</a>
		</p>

	</div>

	<div>
		<h2>Explore the Code - Worker</h2>

		<p>
		We should support many worker nodes and we assume that they can be unstable. Therefore we
		don't let the worker nodes be members of the cluster, instead they communicate with the cluster
		through the <a href="http://doc.akka.io/docs/akka/2.4.0/scala/cluster-client.html"
		target="_blank">Cluster Client</a>. The worker doesn't have to know exactly where the master
		is located.
		</p>

		<p><img src="tutorial/img6.png" border="0" /></p>

		<p>
		You can see how a worker is started in the method <code>startWorker</code>
		in <a href="#code/src/main/scala/worker/Main.scala" class="shortcut">Main.scala</a>
		</p>

		<p>
		Open <a href="#code/src/main/scala/worker/Worker.scala" class="shortcut">Worker.scala</a>.
		</p>

		<p>
		The worker register itself periodically to the master, see the <code>registerTask</code>.
		This has the nice characteristics that master and worker can be started in any order, and
		in case of master fail over the worker re-register itself to the new master.
		</p>

		<p><img src="tutorial/img7.png" border="0" /></p>

		<p>
		The Frontend actor sends the work to the master actor.
		</p>

		<p><img src="tutorial/img8.png" border="0" /></p>

		<p>
		When the worker receives work from the master it delegates the actual processing to
		a child actor, <a href="#code/src/main/scala/worker/WorkExecutor.scala" class="shortcut">WorkExecutor</a>,
		to keep the worker responsive while executing the work.
		</p>

		<p><img src="tutorial/img9.png" border="0" /></p>

	</div>
	<div>
		<h2>Explore the Code - Master Revisited</h2>

		<p>
		Now when we know more about the Worker and Frontend that interacts with the Master it
		is time to take a closer look at
		<a href="#code/src/main/scala/worker/Master.scala" class="shortcut">Master.scala</a>.
		</p>

		<p>
		Workers register itself to the master with <code>RegisterWorker</code>. Each worker
		has an unique identifier and the master keeps track of the workers, including current
		<code>ActorRef</code> (sender of <code>RegisterWorker</code> message) that can be used
		for sending notifications to the worker. This <code>ActorRef</code> is not a direct
		link to the worker actor, but messages sent to it will be delivered to the worker.
		When using the cluster client messages are are tunneled via the receptionist on some
		node in the cluster to avoid inbound connections from other cluster nodes to the client.
		</p>

		<p>
		When the master receives <code>Work</code> from frontend it adds the work item to
		the queue of pending work and notifies idle workers with <code>WorkIsReady</code> message.
		</p>

		<p>
		To be able to restore same state in case of fail over to a standby master actor the
		changes (domain events) are stored in an append only transaction log and can be replayed
		when standby actor is started.
		<a href="http://doc.akka.io/docs/akka/2.4.0/scala/persistence.html" target="_blank">Akka Persistence</a>
		is used for that. <a href="#code/src/main/scala/worker/Master.scala" class="shortcut">Master</a> extends
		<code>PersistentActor</code> and events are stored in with the calls to the <code>persist</code>
		method. When the domain event has been saved successfully the master replies
		with an acknowledgement message (<code>Ack</code>) to the frontend.
		The master also keeps track of accepted work identifiers to be able to discard duplicates
		sent from the frontend.
		</p>

		<p><img src="tutorial/img8.png" border="0" /></p>

		<p>
		When a worker receives <code>WorkIsReady</code> it sends back <code>WorkerRequestsWork</code>
		to the master, which hands out the work, if any, to the worker. The master keeps track of that
		the worker is busy and expect a result within a deadline. For long running jobs the worker
		could send progress messages, but that is not implemented in the example.
		</p>

		<p><img src="tutorial/img9.png" border="0" /></p>

		<p>
		When the worker sends <code>WorkIsDone</code> the master updates its state of the worker
		and sends acknowledgement back to the worker. This message must also be idempotent as the worker will
		re-send if it doesn't receive the acknowledgement.
		</p>

		<p><img src="tutorial/img10.png" border="0" /></p>

	</div>
	</div>
	<div>
		<h2>Summary</h2>

		<p>
		The <a href="#code/src/main/scala/worker/Master.scala" class="shortcut">Master</a> actor
		is a <a href="http://doc.akka.io/docs/akka/2.4.0/scala/cluster-singleton.html"
		target="_blank">Cluster Singleton</a> and register itself in the 
		<a href="http://doc.akka.io/docs/akka/2.4.0/scala/cluster-client.html" target="_blank">Cluster Receptionist</a>.
		The <code>Master</code> is using 
		<a href="http://doc.akka.io/docs/akka/2.4.0/scala/persistence.html" target="_blank">Akka Persistence</a>
		to store incoming jobs and state.
		<p>

		<p>
		The <a href="#code/src/main/scala/worker/Frontend.scala" class="shortcut">Frontend</a> actor send work
		to the master via the <code>ClusterSingletonProxy</code>.
		</p>

		<p>
		The <a href="#code/src/main/scala/worker/Worker.scala" class="shortcut">Worker</a> communicate with the
		cluster and its master with the <a href="http://doc.akka.io/docs/akka/2.4.0/scala/cluster-client.html"
		target="_blank">Cluster Client</a>.
		</p>

		<p><img src="tutorial/img11.png" border="0" /></p>

	</div>	
	<div>
		<h2>Run the Application</h2>

		<p>
			Open the <a href="#run" class="shortcut">Run</a> tab and select <code>worker.Main</code> followed
			by Restart. On the left-hand side we can see the console output, which is logging output
			from nodes joining the cluster, the simulated work and results.
		</p>

<p>
The <a href="#code/src/main/scala/worker/Main.scala" class="shortcut">worker.Main</a>
starts three actor systems in the same JVM process. It can be more
interesting to run them in separate processes. <b>Stop</b> the application in the
<a href="#run" class="shortcut">Run</a> tab and then open three terminal windows.
</p>

<p>
In the first terminal window, start the first seed node with the following command:
</p>

<pre><code>
&lt;path to activator dir&gt;/activator "runMain worker.Main 2551"
</code></pre>

<p>
2551 corresponds to the port of the first seed-nodes element in the configuration. In the log
output you see that the cluster node has been started and changed status to 'Up'.
</p>

<p>
In the second terminal window, start the frontend node with the following command:
</p>

<pre><code>
&lt;path to activator dir&gt;/activator "runMain worker.Main 3001"		
</code></pre>

<p>
3001 is to the port of the node. In the log output you see that the cluster node has been started
and joins the 2551 node and becomes a member of the cluster. Its status changed to 'Up'.
</p>

<p>
Switch over to the first terminal window and see in the log output that the member joined.
So far, no <code>Worker</code> has not been started, i.e. jobs are produced and accepted but
not processed.
</p>

<p>
In the third terminal window, start a worker node with the following command:
</p>

<pre><code>
&lt;path to activator dir&gt;/activator "runMain worker.Main 0"
</code></pre>

<p>
Now you don't need to specify the port number, 0 means that it will use a random available port.
This worker node is not part of the cluster, but it connects to one of the configured cluster
nodes via the <code>ClusterClient</code>. Look at the log output in the different terminal
windows. In the second window (frontend) you should see that the produced jobs are processed
and logged as <code>"Consumed result"</code>.
</p>

<p>
Take a look at the logging that is done in 
<a href="#code/src/main/scala/worker/WorkProducer.scala" class="shortcut">WorkProducer.scala</a>,
<a href="#code/src/main/scala/worker/Master.scala" class="shortcut">Master</a>
and <a href="#code/src/main/scala/worker/Worker.scala" class="shortcut">Worker.scala</a>.
Identify the corresponding log entries in the 3 terminal windows.
</p>

<p>
Shutdown the worker node (third terminal window) with <code>ctrl-c</code>.
Observe how the <code>"Consumed result"</code> logs in the frontend node (second terminal window)
stops. Start the worker node again.
</p>

<pre><code>
&lt;path to activator dir&gt;/activator "runMain worker.Main 0"
</code></pre>

<p>
You can also start more such worker nodes in new terminal windows.
</p>

<p>
You can start more cluster backend nodes using port numbers between 2000-2999:
</p>

<pre><code>
&lt;path to activator dir&gt;/activator "runMain worker.Main 2552"
</code></pre>

<p>
You can start more cluster frontend nodes using port numbers between 3000-3999:
</p>

<pre><code>
&lt;path to activator dir&gt;/activator "runMain worker.Main 3002"		
</code></pre>

<p>
Note that this sample runs the 
<a href="http://doc.akka.io/docs/akka/2.4.0/scala/persistence.html#Shared_LevelDB_journal" target="_blank">shared LevelDB journal</a>
on the node with port 2551. This is a single point of failure, and should not be used in production. 
A real system would use a 
<a href="http://akka.io/community/" target="_blank">distributed journal</a>.
</p>

<p>
The files of the shared journal are saved in the target directory and when you restart
the application the state is recovered. You can clean the state with:
</p>

<pre><code>
&lt;path to activator dir&gt;/activator clean
</code></pre>


	</div>
	<div>
		<h2>Many Masters</h2>

		<p>
		If the singleton master becomes a bottleneck we can start several master actors and
		shard the jobs among them. For each shard of master/standby nodes we use a separate
		cluster role name, e.g. "backend-shard1", "backend-shard2".
		Implementation of this is left as an exercise.
		</p>

	</div>
	<div>
		<h2>Next Steps</h2>

		<p>
		In this example we have used
		<a href="http://doc.akka.io/docs/akka/2.4.0/scala/cluster-singleton.html"
		target="_blank">Cluster Singleton</a>,
		<a href="http://doc.akka.io/docs/akka/2.4.0/scala/cluster-client.html"
		target="_blank">Cluster Client</a> and
		<a href="http://doc.akka.io/docs/akka/2.4.0/scala/distributed-pub-sub.html"
		target="_blank">Distributed Publish Subscribe</a>.
		</p>

		<p>
		More in depth documentation can be found in the
		<a href="http://doc.akka.io/docs/akka/2.4.0/common/cluster.html"
		target="_blank">Cluster Specification</a> and in the
		<a href="http://doc.akka.io/docs/akka/2.4.0/scala/cluster-usage.html"
		target="_blank">Cluster Usage</a> documentation.
		</p>

	</div>

</body>
</html>
